gRPC - Authentication and Signature Verification
After FI is ready with the endpoints for inbound message processing, they would have to share the API endpoint URL and credentials. NetXD will use this to configure the connectivity with the FIs core. NetXD will generate the key pair and share the public key with FI.
Introduction to gRPC
The Inbound APIs are built using gRPC (gRPC Remote Procedure Call), a high-performance, open-source RPC framework facilitating communication between clients and servers in a distributed system.
One of the key components of gRPC is its utilization of HTTP/2 as the underlying transport protocol for several advantages, including enhanced performance and efficiency. The HTTP/2 enables multiple requests and responses to be sent over a single connection simultaneously.
gRPC relies on Protocol Buffers (Protobuf) as the interface description language. Protocol Buffers provides a structured and efficient way to define the data structures and operations exchanged between clients and servers.
gRPC offers a range of advanced features such as Authentication mechanisms that are built into the framework, allowing clients and servers to verify each other's identities before exchanging sensitive information
gRPC provides support for load balancing, enabling incoming requests to be distributed across multiple server instances for improved reliability.
Getting started with gRPC
Tools Required
gRPC Runtime - A tool specific to a programming language that allows developers build and uses gRPC services. For example, "grpc-java" is the gRPC runtime for Java, and "grpc-python" is for Python
Protocol Buffers compiler (protoc) - A tool used to compile .proto files
Installing Protocol Buffers Compiler
Install the Protocol Buffers Compiler using a package manager with the following commands.
sudo apt-get install -y protobuf-compiler
Installing Go Packages
Install the Go Packages using a package manager with the following commands.
go get google.golang.org/grpc
go get google.golang.org/protobuf/cmd/protoc-gen-go
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc
Service Definitions
Service definitions are the specifications that outline the structure, methods, and behaviors of the services provided by the server and the corresponding actions that clients can take. These definitions are written in Protocol Buffers (.proto) files.
Click here to download the (.proto) file for this gRPC.
Generating Server Code
The gRPC server runs on the bank side, and the XD Instant Payments will act as a client.
Follow the below steps to generate the Go code for server side:
Download the Protocol Buffers (.proto) file
Navigate to the directory where the (.proto) file is located using
cd
command in the terminal or command line interface (CLI)Enter the below command to generate the server code
- Go
- C#
- Python
- Java
protoc --go_out=. --go-grpc_out=. pb.proto
Plugin Required: grpc_csharp_plugin
protoc --csharp_out=. --grpc_out=. --plugin=protoc-gen-grpc=/path/to/grpc_csharp_plugin pb.proto
Package Required: grpcio-tools
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. pb.proto
Plugin Required: protoc-gen-grpc-java
protoc --java_out=. --grpc-java_out=. pb.proto
The server code files are generated in the current directory
Setting up the gRPC server
Create a server file (Example: server.go) & implement the server interface, defined in the generated server code
Example Server Code
- Go
- C#
- Python
- Java
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "path/of/your/generated/stub"
)
type server struct {
pb.UnimplementedInboundServiceServer
}
func (s *server) CreditTransferInbound(ctx context.Context, in *pb.InboundRequest) (*pb.Response, error) {
log.Printf("CreditTransferInbound Request %v", in)
// business logic to validate and approve or reject the transaction
return &pb.Response{ReferenceNumber: in.ReferenceNumber, TransactionType: in.TransactionType, Status: "ACTC"}, nil
}
func main() {
// Create a listener on TCP port 50051
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Create a gRPC server object without TLS/SSL
s := grpc.NewServer() // without TLS/SSL
// Register the service with the server
pb.RegisterInboundServiceServer(s, &server{})
// Start the server
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using pb;
public class InboundServiceImpl : InboundService.InboundServiceBase {
private readonly ILogger<InboundServiceImpl> _logger;
public InboundServiceImpl(ILogger<InboundServiceImpl> logger) {
_logger = logger;
}
public override Task<Response> CreditTransferInbound(InboundRequest request, ServerCallContext context) {
_logger.LogInformation("CreditTransferInbound Request: {0}", request);
var response = new Response {
ReferenceNumber = request.ReferenceNumber,
TransactionType = request.TransactionType,
Status = "ACTC"
};
return Task.FromResult(response);
}
}
class Program {
const int Port = 50051;
public static void Main(string[] args) {
var server = new Server {
Services = { InboundService.BindService(new InboundServiceImpl(new LoggerFactory().CreateLogger<InboundServiceImpl>())) },
Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) }
};
server.Start();
Console.WriteLine("Server listening on port " + Port);
Console.WriteLine("Press any key to stop the server...");
Console.ReadKey();
server.ShutdownAsync().Wait();
}
}
from concurrent import futures
import logging
import grpc
import pb2
import pb2_grpc
class InboundService(pb2_grpc.InboundServiceServicer):
def CreditTransferInbound(self, request, context):
logging.info(f"CreditTransferInbound Request: {request}")
response = pb2.Response(
ReferenceNumber=request.ReferenceNumber,
TransactionType=request.TransactionType,
Status="ACTC"
)
return response
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_InboundServiceServicer_to_server(InboundService(), server)
server.add_insecure_port('[::]:50051')
server.start()
logging.info('Server started, listening on port 50051')
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
serve()
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.logging.Logger;
import pb.InboundServiceGrpc;
import pb.InboundServiceOuterClass.InboundRequest;
import pb.InboundServiceOuterClass.Response;
public class GRPCServer {
private static final Logger logger = Logger.getLogger(GRPCServer.class.getName());
static class InboundServiceImpl extends InboundServiceGrpc.InboundServiceImplBase {
@Override
public void creditTransferInbound(InboundRequest request, StreamObserver<Response> responseObserver) {
logger.info("CreditTransferInbound Request: " + request);
Response response = Response.newBuilder()
.setReferenceNumber(request.getReferenceNumber())
.setTransactionType(request.getTransactionType())
.setStatus("ACTC")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(50051)
.addService(new InboundServiceImpl())
.build()
.start();
logger.info("Server started, listening on " + server.getPort());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down gRPC server");
server.shutdown();
logger.info("Server shut down");
}));
server.awaitTermination();
}
}
Authentication and Security
The gRPC API supports several authentication mechanisms:
- Secure Sockets Layer/Transport Layer Security (SSL/TLS)
- Basic Credential Authentication (Basic Auth)
- Digital Signature
Setting up the gRPC server with SSL/TLS
Using SSL/TLS with gRPC provides a secure channel for communication, by encrypting the data and ensuring trust between clients and servers. Follow the below steps to configure gRPC server with SSL/TLS.
Create a server file (Example: server.go) & implement the server interface, defined in the generated server code
Load the server's certificate and private key, and create the credentials using
credentials.NewServerTLSFromFile(certFile, keyFile)
Pass the credentials to the gRPC server using
grpc.NewServer(grpc.Creds(creds))
Example Go Server Code
- Go
- C#
- Python
- Java
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "path/of/your/generated/stub"
)
type server struct {
pb.UnimplementedInboundServiceServer
}
func (s *server) CreditTransferInbound(ctx context.Context, in *pb.InboundRequest) (*pb.Response, error) {
log.Printf("CreditTransferInbound Request %v", in)
// business logic to validate and approve or reject the transaction
return &pb.Response{ReferenceNumber: in.ReferenceNumber, TransactionType: in.TransactionType, Status: "ACTC"}, nil
}
func main() {
// Create a listener on TCP port 50051
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Load the SSL/TLS Certificates from disk
certFile := "path/to/server.crt"
keyFile := "path/to/server.key"
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
if err != nil {
log.Fatalf("failed to load TLS credentials: %v", err)
}
// Create a gRPC server object with the SSL/TLS credentials
s := grpc.NewServer()
s := grpc.NewServer(grpc.Creds(creds))
// Register the service with the server
pb.RegisterInboundServiceServer(s, &server{})
// Start the server
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
namespace GrpcServer
{
public class Program
{
public static void Main(string[] args)
{
var server = new Server
{
Services = { InboundService.BindService(new InboundServiceImpl()) },
Ports = { new ServerPort("localhost", 50051, new SslServerCredentials(new List<KeyCertificatePair> {
new KeyCertificatePair(
File.ReadAllText("path/to/server.crt"),
File.ReadAllText("path/to/server.key")
)
})) }
};
server.Start();
Console.WriteLine("Server listening on port 50051");
Console.ReadKey();
server.ShutdownAsync().Wait();
}
}
public class InboundServiceImpl : InboundService.InboundServiceBase
{
private readonly ILogger<InboundServiceImpl> _logger;
public InboundServiceImpl(ILogger<InboundServiceImpl> logger)
{
_logger = logger;
}
public override Task<Response> CreditTransferInbound(InboundRequest request, ServerCallContext context)
{
_logger.LogInformation($"CreditTransferInbound Request {request}");
return Task.FromResult(new Response
{
ReferenceNumber = request.ReferenceNumber,
TransactionType = request.TransactionType,
Status = "ACTC"
});
}
}
}
from concurrent import futures
import logging
import grpc
from grpc import ssl_server_credentials
import inbound_pb2
import inbound_pb2_grpc
class InboundServiceServicer(inbound_pb2_grpc.InboundServiceServicer):
def CreditTransferInbound(self, request, context):
logging.info(f"CreditTransferInbound Request {request}")
return inbound_pb2.Response(
reference_number=request.reference_number,
transaction_type=request.transaction_type,
status="ACTC"
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
inbound_pb2_grpc.add_InboundServiceServicer_to_server(InboundServiceServicer(), server)
with open('path/to/server.crt', 'rb') as f:
certificate_chain = f.read()
with open('path/to/server.key', 'rb') as f:
private_key = f.read()
server_credentials = grpc.ssl_server_credentials(((private_key, certificate_chain),))
server.add_secure_port('[::]:50051', server_credentials)
server.start()
logging.info("Server listening on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
serve()
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.IOException;
import java.util.logging.Logger;
public class InboundServiceServer {
private static final Logger logger = Logger.getLogger(InboundServiceServer.class.getName());
private Server server;
public static void main(String[] args) throws IOException, InterruptedException {
final InboundServiceServer server = new InboundServiceServer();
server.start();
server.blockUntilShutdown();
}
private void start() throws IOException {
SslContext sslContext = GrpcSslContexts.configure(
SslContextBuilder.forServer(new File("path/to/server.crt"), new File("path/to/server.key"))
).build();
server = NettyServerBuilder.forPort(50051)
.sslContext(sslContext)
.addService(new InboundServiceImpl())
.build()
.start();
logger.info("Server started, listening on " + 50051);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
InboundServiceServer.this.stop();
System.err.println("*** server shut down");
}));
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
static class InboundServiceImpl extends InboundServiceGrpc.InboundServiceImplBase {
@Override
public void creditTransferInbound(InboundRequest request, StreamObserver<Response> responseObserver) {
logger.info("CreditTransferInbound Request " + request);
Response response = Response.newBuilder()
.setReferenceNumber(request.getReferenceNumber())
.setTransactionType(request.getTransactionType())
.setStatus("ACTC")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
}
Basic Auth and Digital Signature
To use Basic Auth and Digital Signature, include the below metadata in the request payload :
- Go
- C#
- Python
- Java
authHeader := "Basic {{base64_encoded_credentials}}"
signature := "{{signature}}"
md := metadata.Pairs(
"authorization", authHeader,
"signature", signature,
)
ctx = metadata.NewOutgoingContext(ctx, md)
string base64EncodedCredentials = "your-base64-encoded-credentials";
string algorithm = "your-algorithm";
string keyId = "your-key-id";
string signature = "your-signature";
var headers = new Metadata
{
{ "authorization", $"Basic {base64EncodedCredentials}" },
{ "signature", $"algorithm:{algorithm} keyId:{keyId} signature:{signature}" }
};
base64_encoded_credentials = "your_base64_encoded_credentials"
algorithm = "your_algorithm"
keyId = "your_keyId"
signature = "your_signature"
headers = [('authorization', f'Basic {base64_encoded_credentials}'),('signature', f'algorithm:{algorithm} keyId:{keyId} signature:{signature}')]
String authHeader = "Basic base64_encoded_credentials";
String algorithm = "your-algorithm";
String keyId = "your-key-id";
String sig = "your-signature";
String signature = String.format("algorithm:%s keyId:%s signature:%s", algorithm, keyId, sig);
Metadata md = new Metadata();
Metadata.Key<String> authKey = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> signatureKey = Metadata.Key.of("signature", Metadata.ASCII_STRING_MARSHALLER);
md.put(authKey, authHeader);
md.put(signatureKey, signature);
Sample code for gRPC Server Signature Verification
- Go
- C#
- Python
- Java
package main
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"crypto/x509"
"encoding/asn1"
"encoding/base64"
"encoding/pem"
"fmt"
"log"
"math/big"
"net"
"strings"
pb "path/to/your/generated/stub"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
const (
algorithmECDSASHA256 = "ECDSA-SHA256"
signatureHeader = "signature"
)
var publicKey = []byte(`-----BEGIN PUBLIC KEY-----
.....
-----END PUBLIC KEY-----`)
type Verifier struct {
publicKey *ecdsa.PublicKey
}
type payloadKey struct{}
type payload struct {
Data []byte
}
type payloadHandler struct{}
func (h *payloadHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
var p payload
return context.WithValue(ctx, payloadKey{}, &p)
}
func (h *payloadHandler) HandleRPC(ctx context.Context, st stats.RPCStats) {
switch s := st.(type) {
case *stats.InPayload:
if p, ok := ctx.Value(payloadKey{}).(*payload); ok {
p.Data = s.Data
}
default:
}
}
func (h *payloadHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (h *payloadHandler) HandleConn(_ context.Context, _ stats.ConnStats) {
}
// pass the public key to the newVerifier function
func newVerifier(publicKeyPEM []byte) (*Verifier, error) {
block, _ := pem.Decode(publicKeyPEM)
if block == nil || !strings.Contains(block.Type, "PUBLIC KEY") {
return nil, fmt.Errorf("invalid public key file")
}
publicKey, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}
ecdsaPublicKey, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return nil, fmt.Errorf("invalid public key type")
}
return &Verifier{publicKey: ecdsaPublicKey}, nil
}
type server struct {
pb.UnimplementedInboundServiceServer
}
func main() {
lis, err := net.Listen("tcp", ":5455")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
grpcServer := grpc.NewServer(grpc.StatsHandler(&payloadHandler{}), grpc.UnaryInterceptor(authInterceptor))
pb.RegisterInboundServiceServer(grpcServer, &server{})
log.Printf("gRPC server listening at %v", lis.Addr())
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Println("Authenticating request...")
value, ok := ctx.Value(payloadKey{}).(*payload)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "failed to extract payload from context")
}
v, _ := newVerifier(publicKey)
if err := v.VerifySignature(ctx, value.Data); err != nil {
log.Printf("Signature verification failed: %v", err)
return &pb.Response{Status: "Signature Verification Failed"}, status.Errorf(codes.FailedPrecondition, "signature verification failed")
}
return handler(ctx, req)
}
func (v *Verifier) VerifySignature(ctx context.Context, rawData []byte) error {
log.Println("Signature verification process started")
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("failed to extract metadata from context")
}
signature := md.Get(signatureHeader)
if len(signature) == 0 {
return fmt.Errorf("no signature found, unable to process request")
}
signatureInfo := parseSignatureInfo(strings.Trim(signature[0], "[]"))
sigBytes, err := base64.StdEncoding.DecodeString(signatureInfo[signatureHeader])
if err != nil {
log.Printf("Error decoding base64 signature: %v", err)
return err
}
switch signatureInfo["algorithm"] {
case algorithmECDSASHA256:
log.Println("Signature algorithm:", algorithmECDSASHA256)
hash := sha256.Sum256(rawData)
var sig struct{ R, S *big.Int }
if _, err := asn1.Unmarshal(sigBytes, &sig); err != nil {
fmt.Println("Error decoding signature:", err)
return fmt.Errorf("error decoding signature")
}
if !ecdsa.Verify(v.publicKey, hash[:], sig.R, sig.S) {
return fmt.Errorf("signature is invalid")
}
log.Println("Signature is valid.")
return nil
default:
return fmt.Errorf("unsupported signature algorithm")
}
}
func parseSignatureInfo(s string) map[string]string {
data := make(map[string]string)
for _, pair := range strings.Split(s, ",") {
parts := strings.SplitN(pair, "=", 2)
if len(parts) != 2 {
log.Printf("Invalid signature info: %s", pair)
continue
}
data[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
}
return data
}
DigitalSignatureVerifier: ECDSA Signature verification
using System;
using System.Security.Cryptography;
namespace Inbound.Services
{
public class VerifySignature
{
private static string GetPemPublicKey()
{
string publicKey = @"-----BEGIN PUBLIC KEY-----
.....
-----END PUBLIC KEY-----";
publicKey = publicKey.Replace("-----BEGIN PUBLIC KEY-----", "")
.Replace("-----END PUBLIC KEY-----", "")
.Replace("\n", "")
.Replace("\r", "");
return publicKey;
}
public static bool Verify(byte[] message, string base64Signature)
{
byte[] signature = Convert.FromBase64String(base64Signature);
using (ECDsa ecdsa = ECDsa.Create())
{
ecdsa.ImportSubjectPublicKeyInfo(Convert.FromBase64String(GetPemPublicKey()), out _);
bool isVerified = ecdsa.VerifyData(message, signature, HashAlgorithmName.SHA256, DSASignatureFormat.Rfc3279DerSequence);
Console.WriteLine($"Signature verification result: {isVerified}");
return isVerified;
}
}
}
}
AuthInterceptor: Intercepts gRPC requests to perform signature verification
using Grpc.Core;
using Grpc.Core.Interceptors;
using Google.Protobuf;
using System;
using System.Threading.Tasks;
namespace Inbound.Services
{
public class AuthInterceptor : Interceptor
{
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
Console.WriteLine($"Received request of type: {typeof(TRequest).Name}");
var signature = context.RequestHeaders.GetValue("signature");
if (signature == null)
{
throw new RpcException(new Status(StatusCode.Unauthenticated, "Signature is missing"));
}
string signatureValue = GetSignatureValue(signature);
byte[] rawPayload = GetRawPayloadBytes(request);
Console.WriteLine($"Signature: {signatureValue}");
if (!string.IsNullOrEmpty(signatureValue) && !VerifySignature.Verify(rawPayload, signatureValue))
{
Console.WriteLine("Invalid signature");
throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid signature"));
}
var response = await continuation(request, context);
return response;
}
private string GetSignatureValue(string headerValue)
{
var parts = headerValue.Split(',');
var signaturePart = Array.Find(parts, p => p.Trim().StartsWith("signature="));
return signaturePart?.Trim().Substring("signature=".Length) ?? string.Empty;
}
private byte[] GetRawPayloadBytes<TRequest>(TRequest request)
{
if (request is IMessage message)
{
return message.ToByteArray();
}
throw new InvalidOperationException("Request is not a protobuf message.");
}
}
}
Code snippet of dependencies required for the Python gRPC:
python3 -m grpc_tools.protoc -I {dependency-path} --python_out={path} --grpc_python_out={path} {proto-path}
from concurrent import futures
import time
import grpc
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
import base64
import proto_pb2 # Adjust import as per your package structure
import proto_pb2_grpc # Adjust import as per your package structure
def split_info(s):
pairs = s.split(",")
data = {}
for pair in pairs:
parts = pair.split("=", 1)
if len(parts) != 2:
print("Invalid input")
return None
key = parts[0].strip()
value = parts[1].strip()
data[key] = value
return data
class InboundServiceServicer(proto_pb2_grpc.InboundServiceServicer):
def GRPCPing(self, request, context):
return proto_pb2.Response(
status="Pinged successfully",
)
class RawPayloadInterceptor(grpc.ServerInterceptor):
def __init__(self, public_key_pem):
self.public_key = load_pem_public_key(public_key_pem.encode())
def intercept_service(self, continuation, handler_call_details):
handler = continuation(handler_call_details)
if handler is None:
return None
if handler.request_streaming or handler.response_streaming:
return handler
return grpc.unary_unary_rpc_method_handler(
self._intercept_unary_unary(handler.unary_unary),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
def _intercept_unary_unary(self, handler):
def wrapper(request, context):
raw_payload = request.SerializeToString()
metadata = dict(context.invocation_metadata())
try:
signature = metadata.get('signature')
if not signature:
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Missing signature")
info = split_info(signature)
signature_bytes = base64.b64decode(info["signature"])
self.public_key.verify(
signature_bytes,
raw_payload,
ec.ECDSA(hashes.SHA256())
)
print("Signature is valid")
except (InvalidSignature, ValueError, TypeError) as e:
print(f"Signature verification failed: {e}")
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid signature")
try:
response = handler(request, context)
except grpc.RpcError as e:
print(f"Error during RPC: {e}")
raise
return response
return wrapper
def get_auth_interceptor():
public_key_pem = """
-----BEGIN PUBLIC KEY-----
....
-----END PUBLIC KEY-----
""" # Replace with your public key
return RawPayloadInterceptor(public_key_pem)
def serve():
executor = futures.ThreadPoolExecutor(max_workers=10)
server = grpc.server(executor, interceptors=[get_auth_interceptor()])
proto_pb2_grpc.add_InboundServiceServicer_to_server(InboundServiceServicer(), server)
server.add_insecure_port('[::]:5455')
server.start()
print("Server started on port 5455")
try:
while True:
time.sleep(86400) # Sleep for one day
except KeyboardInterrupt:
server.stop(0)
print("Server stopped")
if __name__ == '__main__':
serve()
Code snippet of Maven dependencies required for the Java gRPC:
io.grpc : protoc-gen-grpc-java
io.grpc : grpc-netty-shaded
io.grpc : grpc-stub
io.grpc : grpc-protobuf
com.google.protobuf : protobuf-java
org.bouncycastle : bcprov-jdk15on
package com.netxd;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.InvalidKeyException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.security.Security;
import java.security.Signature;
import java.security.SignatureException;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import com.google.protobuf.InvalidProtocolBufferException;
import Inbound.InboundRequest;
import Inbound.InboundServiceGrpc;
import Inbound.Response;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
public class GrpcServer {
private static final String algorithmECDSASHA256 = "ECDSA-SHA256";
private static final String signatureHeader = "signature";
static {
Security.addProvider(new BouncyCastleProvider());
}
public static void main(String[] args) throws IOException {
Server server = NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 5455))
.addService(new InboundServiceImpl())
.intercept(new AuthInterceptor())
.build();
server.start();
System.out.println("gRPC server listening at " + server.getPort());
Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
try {
server.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static class AuthInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
System.out.println("Authenticating request...");
String signature = headers.get(Metadata.Key.of(signatureHeader, Metadata.ASCII_STRING_MARSHALLER));
if (signature == null) {
call.close(Status.UNAUTHENTICATED.withDescription("No signature found"), headers);
return new ServerCall.Listener<ReqT>() {};
}
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
private ReqT request;
@Override
public void onMessage(ReqT message) {
this.request = message;
System.out.println("received message : "+ message);
super.onMessage(message);
}
@Override
public void onHalfClose() {
try {
Verifier verifier = Verifier.newVerifier("-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE2te/MU1irNP2wM4wv8mNuduEsMnE\n+Gl4NgcEDS8aZxF8tsXivA+tngNgdrBSJ8raUV3w/8Z+UmVgeuX0UelynA==\n-----END PUBLIC KEY-----".getBytes());
ServerCall.Listener<ReqT> listener = super.delegate();
// Assume req is the InboundRequest type
InboundRequest req = (InboundRequest) request;
System.err.println("req : "+ req);
if (verifier.verifySignature(req, signature)) {
super.onHalfClose();
} else {
call.close(Status.UNAUTHENTICATED.withDescription("Signature verification failed"), headers);
}
} catch (Exception e) {
call.close(Status.UNAUTHENTICATED.withDescription("Signature verification error: " + e.getMessage()), headers);
}
}
};
}
}
static class InboundServiceImpl extends InboundServiceGrpc.InboundServiceImplBase {
public void creditTransferInbound(InboundRequest request, StreamObserver<Response> responseObserver) {
// Handle the actual RPC call here
Response response = Response.newBuilder().setStatus("Success").build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
public void gRPCPing(InboundRequest request, StreamObserver<Response> responseObserver) {
// Handle the actual RPC call here
Response response = Response.newBuilder().setStatus("Success").build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
// Implement others methods available on the generated stub InboundServiceImplBase class
}
static class Verifier {
private PublicKey publicKey;
private String pemStr;
public Verifier(PublicKey publicKey,String pemStr) {
this.publicKey = publicKey;
this.pemStr = pemStr;
}
public static Verifier newVerifier(byte[] publicKeyPEM) throws Exception {
String pem = new String(publicKeyPEM);
// String pem = "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE2te/MU1irNP2wM4wv8mNuduEsMnE\n+Gl4NgcEDS8aZxF8tsXivA+tngNgdrBSJ8raUV3w/8Z+UmVgeuX0UelynA==/n-----END PUBLIC KEY-----";
String pemNew = pem.replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "").replaceAll("\\s", "");
byte[] encoded = Base64.getDecoder().decode(pemNew);
X509EncodedKeySpec keySpec = new X509EncodedKeySpec(encoded);
KeyFactory keyFactory = KeyFactory.getInstance("ECDSA");
PublicKey publicKey = keyFactory.generatePublic(keySpec);
return new Verifier(publicKey, pem);
}
public boolean verifySignature(InboundRequest in, String signature) throws InvalidProtocolBufferException, NoSuchAlgorithmException, InvalidKeyException, SignatureException {
byte[] oData = in.toByteArray();
Map<String, String> signatureInfo = parseSignatureInfo(signature);
System.out.println("signatureInfo : "+signatureInfo);
byte[] sigBytes = Base64.getDecoder().decode(signatureInfo.get("signature"));
if (algorithmECDSASHA256.equals(signatureInfo.get("algorithm"))) {
Signature ecdsaVerify = Signature.getInstance("SHA256withECDSA");
ecdsaVerify.initVerify(this.publicKey);
ecdsaVerify.update(oData);
return ecdsaVerify.verify(sigBytes);
} else {
throw new IllegalArgumentException("Unsupported signature algorithm");
}
}
private Map<String, String> parseSignatureInfo(String s) {
Map<String, String> data = new HashMap<>();
String[] pairs = s.split(",");
for (String pair : pairs) {
String[] parts = pair.split("=");
if (parts.length == 2) {
data.put(parts[0].trim(), parts[1].trim());
}
}
return data;
}
}
}